package org.frameworkset.schedule;

import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import org.frameworkset.util.Assert;


public class ThreadPoolTaskScheduler  extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {

private volatile int poolSize = 1;

private volatile Boolean removeOnCancelPolicy;

private volatile ScheduledExecutorService scheduledExecutor;

private volatile ErrorHandler errorHandler;


/**
* Set the ScheduledExecutorService's pool size.
* Default is 1.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
*/
public void setPoolSize(int poolSize) {
Assert.isTrue(poolSize > 0, "'poolSize' must be 1 or higher");
this.poolSize = poolSize;
if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
	((ScheduledThreadPoolExecutor) this.scheduledExecutor).setCorePoolSize(poolSize);
}
}

/**
* Set the same property on ScheduledExecutorService (JDK 1.7+).
* There is no default. If not set, the executor property is not set.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
*/
public void setRemoveOnCancelPolicy(boolean removeOnCancelPolicy) {
this.removeOnCancelPolicy = removeOnCancelPolicy;
if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
	((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(removeOnCancelPolicy);
}
}

/**
* Set a custom {@link ErrorHandler} strategy.
*/
public void setErrorHandler(ErrorHandler errorHandler) {
Assert.notNull(errorHandler, "'errorHandler' must not be null");
this.errorHandler = errorHandler;
}

@Override
protected ExecutorService initializeExecutor(
	ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

this.scheduledExecutor = createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);

if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor && this.removeOnCancelPolicy != null) {
	((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(this.removeOnCancelPolicy);
}

return this.scheduledExecutor;
}

/**
* Create a new {@link ScheduledExecutorService} instance.
* <p>The default implementation creates a {@link ScheduledThreadPoolExecutor}.
* Can be overridden in subclasses to provide custom {@link ScheduledExecutorService} instances.
* @param poolSize the specified pool size
* @param threadFactory the ThreadFactory to use
* @param rejectedExecutionHandler the RejectedExecutionHandler to use
* @return a new ScheduledExecutorService instance
* @see #afterPropertiesSet()
* @see java.util.concurrent.ScheduledThreadPoolExecutor
*/
protected ScheduledExecutorService createExecutor(
	int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
}

/**
* Return the underlying ScheduledExecutorService for native access.
* @return the underlying ScheduledExecutorService (never {@code null})
* @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet
*/
public ScheduledExecutorService getScheduledExecutor() throws IllegalStateException {
Assert.state(this.scheduledExecutor != null, "ThreadPoolTaskScheduler not initialized");
return this.scheduledExecutor;
}

/**
* Return the underlying ScheduledThreadPoolExecutor, if available.
* @return the underlying ScheduledExecutorService (never {@code null})
* @throws IllegalStateException if the ThreadPoolTaskScheduler hasn't been initialized yet
* or if the underlying ScheduledExecutorService isn't a ScheduledThreadPoolExecutor
* @see #getScheduledExecutor()
*/
public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() throws IllegalStateException {
Assert.state(this.scheduledExecutor instanceof ScheduledThreadPoolExecutor,
		"No ScheduledThreadPoolExecutor available");
return (ScheduledThreadPoolExecutor) this.scheduledExecutor;
}

/**
* Return the current pool size.
* <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
* @see #getScheduledThreadPoolExecutor()
* @see java.util.concurrent.ScheduledThreadPoolExecutor#getPoolSize()
*/
public int getPoolSize() {
if (this.scheduledExecutor == null) {
	// Not initialized yet: assume initial pool size.
	return this.poolSize;
}
return getScheduledThreadPoolExecutor().getPoolSize();
}

/**
* Return the current setting of removeOnCancelPolicy.
* <p>Requires an underlying {@link ScheduledThreadPoolExecutor} and JDK 1.7+.
*/
public boolean isRemoveOnCancelPolicy() {
if (this.scheduledExecutor == null) {
	// Not initialized yet: return false (the default of the executor)
	return false;
}
return getScheduledThreadPoolExecutor().getRemoveOnCancelPolicy();
}

/**
* Return the number of currently active threads.
* <p>Requires an underlying {@link ScheduledThreadPoolExecutor}.
* @see #getScheduledThreadPoolExecutor()
* @see java.util.concurrent.ScheduledThreadPoolExecutor#getActiveCount()
*/
public int getActiveCount() {
if (this.scheduledExecutor == null) {
	// Not initialized yet: assume no active threads.
	return 0;
}
return getScheduledThreadPoolExecutor().getActiveCount();
}


// SchedulingTaskExecutor implementation

@Override
public void execute(Runnable task) {
Executor executor = getScheduledExecutor();
try {
	executor.execute(errorHandlingTask(task, false));
}
catch (RejectedExecutionException ex) {
	throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

@Override
public void execute(Runnable task, long startTimeout) {
execute(task);
}

@Override
public Future<?> submit(Runnable task) {
ExecutorService executor = getScheduledExecutor();
try {
	return executor.submit(errorHandlingTask(task, false));
}
catch (RejectedExecutionException ex) {
	throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

@Override
public <T> Future<T> submit(Callable<T> task) {
ExecutorService executor = getScheduledExecutor();
try {
	Callable<T> taskToUse = task;
	if (this.errorHandler != null) {
		taskToUse = new DelegatingErrorHandlingCallable<T>(task, this.errorHandler);
	}
	return executor.submit(taskToUse);
}
catch (RejectedExecutionException ex) {
	throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

@Override
public ListenableFuture<?> submitListenable(Runnable task) {
ExecutorService executor = getScheduledExecutor();
try {
	ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
	executor.execute(errorHandlingTask(future, false));
	return future;
}
catch (RejectedExecutionException ex) {
	throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ExecutorService executor = getScheduledExecutor();
try {
	ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
	executor.execute(errorHandlingTask(future, false));
	return future;
}
catch (RejectedExecutionException ex) {
	throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

@Override
public boolean prefersShortLivedTasks() {
return true;
}


// TaskScheduler implementation

@Override
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
	ErrorHandler errorHandler =
			(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
	return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
}
catch (RejectedExecutionException ex) {
	throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

@Override
public ScheduledFuture<?> schedule(Runnable task, Date startTime) {
ScheduledExecutorService executor = getScheduledExecutor();
long initialDelay = startTime.getTime() - System.currentTimeMillis();
try {
	return executor.schedule(errorHandlingTask(task, false), initialDelay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
	throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {
ScheduledExecutorService executor = getScheduledExecutor();
long initialDelay = startTime.getTime() - System.currentTimeMillis();
try {
	return executor.scheduleAtFixedRate(errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
	throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
	return executor.scheduleAtFixedRate(errorHandlingTask(task, true), 0, period, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
	throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay) {
ScheduledExecutorService executor = getScheduledExecutor();
long initialDelay = startTime.getTime() - System.currentTimeMillis();
try {
	return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), initialDelay, delay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
	throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
	return executor.scheduleWithFixedDelay(errorHandlingTask(task, true), 0, delay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
	throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}


private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) {
return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask);
}


private static class DelegatingErrorHandlingCallable<V> implements Callable<V> {

private final Callable<V> delegate;

private final ErrorHandler errorHandler;

public DelegatingErrorHandlingCallable(Callable<V> delegate, ErrorHandler errorHandler) {
	this.delegate = delegate;
	this.errorHandler = errorHandler;
}

@Override
public V call() throws Exception {
	try {
		return this.delegate.call();
	}
	catch (Throwable t) {
		this.errorHandler.handleError(t);
		return null;
	}
}
}

}
